超简单的Python教程系列

您所在的位置:网站首页 asyncio await作用 超简单的Python教程系列

超简单的Python教程系列

#超简单的Python教程系列| 来源: 网络整理| 查看: 265

超简单的Python教程系列——异步

Python 3.5 引入了两个新关键字:​ ​async​ ​​和​ ​await​ ​​。这些看似神奇的关键字完全可以在没有任何线程的情况下实现类似线程的并发。在本教程中,我们将介绍异步编程的原因,并通过构建我们自己的小型异步类框架来说明Python的​ ​async/await​ ​关键字如何在内部工作。

为什么要异步编程?

要了解异步编程的动机,我们必须首先了解是什么限制了我们的代码速度。理想情况下,我们希望我们的代码以光速运行,并立即跳过我们的代码,没有任何延迟。然而,由于两个因素,代码的实际运行速度要慢得多:

[En]

To understand the motivation of asynchronous programming, we must first understand what limits the speed of our code. Ideally, we want our code to run at the speed of light and skip our code immediately without any delay. However, the code actually runs much slower due to two factors:

CPU时间(处理器执行指令的时间) IO时间(等待网络请求或存储读/写的时间)

当我们的代码在等待 IO 时,CPU 基本上是空闲的,等待某个外部设备响应。通常,内核会检测到这一点并立即切换到执行系统中的其他线程。因此,如果我们想加快处理一组 IO 密集型任务,我们可以为每个任务创建一个线程。当其中一个线程停止,等待 IO 时,内核将切换到另一个线程继续处理。

这在实践中效果很好,但有两个缺点:

[En]

This works well in practice, but has two disadvantages:

线程有开销(尤其是在 Python 中) 我们无法控制内核何时选择在线程之间切换 [En]

We cannot control when the kernel chooses to switch between threads*

例如,如果我们想要执行 10,000 个任务,我们要么必须创建 10,000 个线程,这将占用大量 RAM,要么我们需要创建较少数量的工作线程并以较少的并发性执行任务。此外,最初生成这些线程会占用 CPU 时间。

因为内核可以随时选择在线程之间切换,所以在我们的代码中随时可能发生竞争。

[En]

Because the kernel can choose to switch between threads at any time, competition can occur at any time in our code.

引入异步

在传统的基于同步线程的代码中,内核必须检测线程何时是IO绑定的,并选择在线程之间随意切换。使用 Python 异步,程序员使用关键字​ ​await​ ​确认声明 IO 绑定的代码行,并确认授予执行其他任务的权限。例如,考虑以下执行Web请求的代码:

async def request_google(): reader, writer = await asyncio.open_connection('google.com', 80) writer.write(b'GET / HTTP/2\n\n') await writer.drain() response = await reader.read() return response.decode()

在这里,在这里,我们看到该代码在两个地方​ ​await​ ​​。​​因此,在等待我们的字节被发送到服务器(​ ​writer.drain()​ ​​)时,在等待服务器用一些字节(​ ​reader.read()​ ​)回复时,我们知道其他代码可能会执行,全局变量可能会更改。然而,从函数开始到第一次等待,我们可以确保代码逐行运行,而不会切换到运行程序中的其他代码。这就是异步的美妙之处。

​ ​asyncio​ ​是一个标准库,可以让我们用这些异步函数做一些有趣的事情。例如,如果我们想同时向Google执行两个请求,我们可以:

async def request_google_twice(): response_1, response_2 = await asyncio.gather(request_google(), request_google()) return response_1, response_2

当我们调用​ ​request_google_twice()​ ​时,神奇的​ ​asyncio.gather​ ​会启动一个函数调用,但是当我们调用时​ ​await writer.drain()​ ​,它会开始执行第二个函数调用,这样两个请求就会并行发生。然后,它等待第一个或第二个请求的​ ​writer.drain()​ ​调用完成并继续执行该函数。

最后,有一个重要的细节被遗漏了:​ ​asyncio.run​ ​。要从常规的 [同步] Python 函数实际调用异步函数,我们将调用包装在​ ​asyncio.run(...)​ ​:

async def async_main(): r1, r2 = await request_google_twice() print('Response one:', r1) print('Response two:', r2) return 12 return_val = asyncio.run(async_main())

请注意,如果我们只调用​ ​async_main()​ ​而不调用​ ​await ...​ ​或者 ​ ​asyncio.run(...)​ ​,则不会发生任何事情。这只是由异步工作方式的性质所限制的。

那么,异步究竟是如何工作的,这些神奇的​ ​asyncio.run​ ​​和​ ​asyncio.gather​ ​函数有什么作用呢?阅读下文以了解详情。

异步是如何工作的

要了解​ ​async​ ​的魔力​,我们首先需要了解一个更简单的 Python 构造:生成器(在前面《 生成器和协程 》,如果你没看过,可以去我的主页看看这篇文章,再回来学习这个就会很容易)。

生成器

生成器是 Python 函数,它逐个返回一系列值(可迭代)。例如:

def get_numbers(): print("|| get_numbers begin") print("|| get_numbers Giving 1...") yield 1 print("|| get_numbers Giving 2...") yield 2 print("|| get_numbers Giving 3...") yield 3 print("|| get_numbers end") print("| for begin") for number in get_numbers(): print(f"| Got {number}.") print("| for end") | for begin || get_numbers begin || get_numbers Giving 1... | Got 1. || get_numbers Giving 2... | Got 2. || get_numbers Giving 3... | Got 3. || get_numbers end | for end

因此,我们看到,对于for循环的每个迭代,我们在生成器中只执行一次。我们可以使用Python的​ ​next()​ ​函数更明确地执行此迭代:

In [3]: generator = get_numbers() In [4]: next(generator) || get_numbers begin || get_numbers Giving 1... Out[4]: 1 In [5]: next(generator) || get_numbers Giving 2... Out[5]: 2 In [6]: next(generator) || get_numbers Giving 3... Out[6]: 3 In [7]: next(generator) || get_numbers end StopIteration:

这与异步函数的行为非常相似。正如异步函数从函数开始直到第一次等待时连续执行代码一样,我们第一次调用​ ​next()​ ​​时,生成器将从函数顶部执行到第一个​ ​yield​ ​ 语句。然而,现在我们只是从生成器返回数字。我们将使用相同的思想,但返回一些不同的东西来使用生成器创建类似异步的函数。

使用生成器进行异步

让我们使用生成器来创建我们自己的小型异步框架。

[En]

Let’s use the generator to create our own small asynchronous framework.

但是,为简单起见,让我们将实际 IO 替换为睡眠(即。​ ​time.sleep​ ​)。让我们考虑一个需要定期发送更新的应用程序:

def send_updates(count: int, interval_seconds: float): for i in range(1, count + 1): time.sleep(interval_seconds) print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))

因此,如果我们调用​ ​send_updates(3, 1.0)​ ​,它将输出这三条消息,每条消息间隔 1 秒:

[1.0] Sending update 1/3. [1.0] Sending update 2/3. [1.0] Sending update 3/3.

现在,假设我们要同时运行几个不同的时间间隔。例如,​ ​send_updates(10, 1.0)​ ​​,​ ​send_updates(5, 2.0)​ ​​和​ ​send_updates(4, 3.0)​ ​。我们可以使用线程来做到这一点,如下所示:

threads = [ threading.Thread(target=send_updates, args=(10, 1.0)), threading.Thread(target=send_updates, args=(5, 2.0)), threading.Thread(target=send_updates, args=(4, 3.0)) ] for i in threads: i.start() for i in threads: i.join()

这可行,在大约 12 秒内完成,但使用具有前面提到的缺点的线程。让我们使用生成器构建相同的东西。

在演示生成器的示例中,我们返回了整数。为了获得类似异步的行为,而不是返回任意值,我们希望返回一些描述要等待的IO的对象。在我们的例子中,我们的”IO”只是一个计时器,它将等待一段时间。因此,让我们创建一个计时器对象,用于此目的:

class AsyncTimer: def __init__(self, duration: float): self.done_time = time.time() + duration

现在,让我们从我们的函数中产生这个而不是调用​ ​time.sleep​ ​:

def send_updates(count: int, interval_seconds: float): for i in range(1, count + 1): yield AsyncTimer(interval_seconds) print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))

现在,每次我们调用​ ​send_updates(...)​ ​​时调用​ ​next(...)​ ​​,我们都会得到一个​ ​AsyncTimer​ ​对象,告诉我们直到我们应该等待什么时候:

generator = send_updates(3, 1.5) timer = next(generator) # [1.5] Sending update 1/3. print(timer.done_time - time.time()) # 1.498...

由于我们的代码现在实际上并没有调用​ ​time.sleep​ ​​,我们现在可以同时执行另一个​ ​send_updates​ ​调用。

因此,为了把这一切放在一起,我们需要后退一步,意识到一些事情:

[En]

So, in order to put it all together, we need to step back and realize something:

生成器就像部分执行的函数,等待一些 IO(计时器)。 每个部分执行的函数都有一些 IO(计时器),它在继续执行之前等待。 因此,我们程序的当前状态是每个部分执行的函数(生成器)和该函数正在等待的 IO(计时器)对的对列表 现在,要运行我们的程序,我们只需要等到某个 IO 准备就绪(即我们的一个计时器已过期),然后再向前一步执行相应的函数,得到一个阻塞该函数的新 IO。

实现这一逻辑为我们提供了以下信息:

[En]

Implementing this logic provides us with the following information:

# Initialize each generator with a timer of 0 so it immediately executes generator_timer_pairs = [ (send_updates(10, 1.0), AsyncTimer(0)), (send_updates(5, 2.0), AsyncTimer(0)), (send_updates(4, 3.0), AsyncTimer(0)) ] while generator_timer_pairs: pair = min(generator_timer_pairs, key=lambda x: x[1].done_time) generator, min_timer = pair # Wait until this timer is ready time.sleep(max(0, min_timer.done_time - time.time())) del generator_timer_pairs[generator_timer_pairs.index(pair)] try: # Execute one more step of this function new_timer = next(generator) generator_timer_pairs.append((generator, new_timer)) except StopIteration: # When the function is complete pass

有了这个,我们有了一个使用生成器的类似异步函数的工作示例。请注意,当生成器完成时,它会引发​ ​StopIteration​ ​,并且当我们不再有部分执行的函数(生成器)时,我们的函数就完成了

现在,我们把它包装在一个函数中,我们得到了类似于​ ​asyncio.run​ ​的东西。结合​ ​asyncio.gather​ ​运行:

def async_run_all(*generators): generator_timer_pairs = [ (generator, AsyncTimer(0)) for generator in generators ] while generator_timer_pairs: pair = min(generator_timer_pairs, key=lambda x: x[1].done_time) generator, min_timer = pair time.sleep(max(0, min_timer.done_time - time.time())) del generator_timer_pairs[generator_timer_pairs.index(pair)] try: new_timer = next(generator) generator_timer_pairs.append((generator, new_timer)) except StopIteration: pass async_run_all( send_updates(10, 1.0), send_updates(5, 2.0), send_updates(4, 3.0) ) 使用 async/await 进行异步

实现我们的caveman版本的​ ​asyncio​ ​​的最后一步是支持Python 3.5中引入的​ ​async/await​ ​​语法。​ ​await​ ​​的行为类似于​ ​yield​ ​,只是它不是直接返回提供的值,而是返回​ ​next((...).__await__())​ ​。​ ​async​ ​​函数返回”协程”,其行为类似于生成器,但需要使用​ ​.send(None)​ ​而不是​ ​next()​ ​(请注意,正如生成器在最初调用时不返回任何内容一样,异步函数在逐步执行之前不会执行任何操作,这解释了我们前面提到的)。

因此,鉴于这些信息,我们只需进行一些调整即可将我们的示例转换为​ ​async/await​ ​。以下是最终结果:

class AsyncTimer: def __init__(self, duration: float): self.done_time = time.time() + duration def __await__(self): yield self async def send_updates(count: int, interval_seconds: float): for i in range(1, count + 1): await AsyncTimer(interval_seconds) print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count)) def _wait_until_io_ready(ios): min_timer = min(ios, key=lambda x: x.done_time) time.sleep(max(0, min_timer.done_time - time.time())) return ios.index(min_timer) def async_run_all(*coroutines): coroutine_io_pairs = [ (coroutine, AsyncTimer(0)) for coroutine in coroutines ] while coroutine_io_pairs: ios = [io for cor, io in coroutine_io_pairs] ready_index = _wait_until_io_ready(ios) coroutine, _ = coroutine_io_pairs.pop(ready_index) try: new_io = coroutine.send(None) coroutine_io_pairs.append((coroutine, new_io)) except StopIteration: pass async_run_all( send_updates(10, 1.0), send_updates(5, 2.0), send_updates(4, 3.0) )

我们有了它,我们的迷你异步示例完成了,使用​ ​async/await​ ​​. 现在,您可能已经注意到我将 timer 重命名为 io 并将查找最小计时器的逻辑提取到一个名为​ ​_wait_until_io_ready​ ​. 这是有意将这个示例与最后一个主题联系起来:真实 IO。

在这里,我们完成了我们的小型异步示例,使用了​ ​async/await​ ​​。现在,你可能已经注意到我将​ ​timer​ ​​重命名为io,并将用于查找最小计时器的逻辑提取到一个名为​ ​_wait_until_io_ready​ ​的函数中。这是为了将本示例与最后一个主题: 真正的IO ,连接起来。

真正的 IO(而不仅仅是定时器)

所以,所有这些例子都很棒,但是它们与真正的 asyncio 有什么关系,我们希望在真正 IO 上等待 TCP 套接字和文件读/写?嗯,美丽就在那个​ ​_wait_until_io_ready​ ​​功能中。为了让真正的 IO 正常工作,我们所要做的就是创建一些​ ​AsyncReadFile​ ​​类似于​ ​AsyncTimer​ ​​包含​ ​文件描述符​ ​​的新对象。然后,​ ​AsyncReadFile​ ​​我们正在等待的对象集对应于一组文件描述符。最后,我们可以使用函数 (syscall) ​ ​select()​ ​等待这些文件描述符之一准备好。由于 TCP/UDP 套接字是使用文件描述符实现的,因此这也涵盖了网络请求。

所以,所有这些例子都很好,但它们与真正的异步IO有什么关系呢?我们希望等待实际的IO,比如TCP套接字和文件读/写?好吧,其优点在于​ ​_wait_until_io_ready​ ​​函数。要使真正的IO工作,我们需要做的就是创建一些新的​ ​AsyncReadFile​ ​​,类似于​ ​AsyncTimer​ ​,它包含一个 文件描述符 。然后,我们正在等待的一组​ ​AsyncReadFile​ ​​对象对应于一组文件描述符。最后,我们可以使用函数(​ ​syscall​ ​​)​ ​select()​ ​等待这些文件描述符之一准备好。由于TCP/UDP套接字是使用文件描述符实现的,因此这也涵盖了网络请求。

总结

我们有了它,Python 异步从头开始​​。虽然我们深入研究了它,但仍有许多细微之处没有涉及。例如,要从另一个生成器函数调用类似生成器异步的函数,我们将使用​ ​yield from​ ​​,我们可以通过将参数传递到​ ​.send(...)​ ​​来从​ ​async​ ​​函数返回值。关于异步IO特定构造还有很多其他主题,还有很多其他的微妙之处,比如异步生成器和取消任务,但我们就把它交给你们下来细细研究了。

Original: https://blog.csdn.net/m0_72557783/article/details/126847728Author: 程序员老华Title: 超简单的Python教程系列——异步

相关阅读 Title: Python Flask框架建立项目 Flask项目建立

我使用的IDE是pycharm2020 专业版。

1、打开pycharm新建项目。点击”文件”->”新建项目”->”Flask”,然后输入保存目录。在这里选择编译环境,我使用的是Python 3.7(可以根据的实际情况选择),下面的”More Settings”默认就好。

2、创建的项目如下所示。

3、对于其它版本的pycharm可能没有Flask的快捷创建方式,我们可以自己创建,其实”static”和”templates”是两个空的文件目录。我们可以直接创建一个python项目。

创建的项目结构为空,如下图所示。

[En]

The created project structure is empty, as shown below.

我们点击项目,新建一个Python File,命名为app.py,然后建立两个”目录”,分别命名为”static”、”templates”即可,此时两个目录都是空的。

在新建的”app.py”中复制之前一样的代码,然后运行项目,可以看见我们的项目已经启动,点击或者在网页输入”http://127.0.0.1:5000/”,即可打开网页。

from flask import Flask app = Flask(__name__) @app.route('/') def hello_world(): return 'Hello World!' if __name__ == '__main__': app.run()

网页展示的内容为”Hello World!”。

至此,我们flask框架的建立便完成了。

书接上文。按照我们新建flask的框架,添加我们的代码。

Flask项目编程

1、基本的配置

from flask import Flask, request, render_template,jsonify app = Flask(__name__) app.config['DEBUG']= True app.config['SEND_FILE_MAX_AGE_DEFAULT']= timedelta(seconds=1) if __name__ == '__main__': app.run(debug=True)

2、设置展示网页的路由和对应的函数。这里我们设置了两个路由展示我们的两个网页,这两个网页的文件是保存在”templates”目录下。

@app.route('/visual_histroy',methods=['post','get']) def get_histroy(): return render_template("histroy_time.html") @app.route('/visual_current',methods=['post','get']) def get_current(): return render_template("current_time.html")

3、设置其它的路由和对应的函数用于前后端进行数据交互。对于每个路由我们都接受”post”和”get”请求方式,其中一些路由把”get”请求返回默认数据。前端向后端传输的数据我们通过”request”对象可以获取;根据”request”对象的数据我们可以获取相应的数据,并返回给前端;后端向前端传输的数据我们通过”flask”里的”jsonify”,转换为json格式传输。

from datetime import timedelta from base_fun import get_now_time,read_data from web_data import Histroy_data,pm25_86 @app.route('/data',methods=['post','get']) def get_cur_data(): if requesthod == "POST": return_dict = Histroy_data(request.values.get('city'), request.values.get('time')) return jsonify(return_dict) else: return_dict = Histroy_data("北京","2021年01月") return jsonify(return_dict) @app.route('/geo_map',methods=['post','get']) def get_map(): province_name = request.values.get('province_name') print("获取地图:",province_name) filename = "./static/js/province/"+province_name+".json" return_dict = read_data(filename) return jsonify(return_dict) @app.route('/data_country',methods=['post','get']) def get_country_data(): if requesthod == "POST": return_dict = pm25_86(request.values.get('province_name')) if return_dict: print("获取到数据:",return_dict) return jsonify(return_dict) else: return None else: return_dict = pm25_86('全国') if return_dict: print("获取到数据:", return_dict) return jsonify(return_dict) else: return None @app.route('/time',methods=['post','get']) def get_time(): if request.values.get('name') == '现在': return get_now_time()

以上为flask框架里面有关路由的设置,接下来分别把flask调用的一些方法,和前端进行实现即可。

Original: https://blog.csdn.net/youngbingbing/article/details/119847144Author: youngbingbingTitle: Python Flask框架建立项目

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/318542/

转载文章受原作者版权保护。转载请注明原作者出处!



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3